{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7.3 多线程输入数据处理框架\n",
    "上节介绍的图像预处理方法虽然可以减小无关因素对图像识别模型的影响，但是这些**复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈，TensorFlow提供了一套多线程处理输入数据的框架。**下图总结了一个经典的输入数据处理的流程，在以下各小节将会分别介绍：\n",
    "<p align='center'>\n",
    "    <img src=images/图7.14.JPG>\n",
    "</p>\n",
    "\n",
    "7.3.1节将首先介绍TensorFlow中队列的概念，7.3.2节会介绍如何再TensorFlow中实现前三步，上图中的数据预处理已经在7.2节中详细介绍，然后在7.3.3节会介绍最后一个流程，最后在7.3.4节会给出一个完整的TensorFlow程序展示整个输入数据处理框架。\n",
    "\n",
    "### 7.3.1 队列与多线程\n",
    "*队列是TensorFlow多线程输入数据处理框架的基础。虽然相比而言，新推出的数据集（Dataset，7.4节会介绍）处理框架更加容易使用和高效，但是由于队列在很多开源项目中仍被广泛使用，并且其较为低层的API有助于读者理解数据输入背后的难点和原理，因此本书仍然建议读者了解队列的相关知识。*\n",
    "\n",
    "在TensorFlow中，队列和变量类似，都是计算图上有状态的节点。其他的计算节点可以修改他们的状态。对于变量，可以通过赋值操作修改变量的取值；对于队列，修改状态的操作主要有`Enqueue`、`EnqueueMany`和`Dequeue`。以下程序展示了如何使用它们来操作一个队列："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0\n",
      "10\n",
      "1\n",
      "11\n",
      "2\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "# 创建一个先进先出队列，指定队列中最多可以保存两个元素，并且指定为整形\n",
    "q = tf.FIFOQueue(2, \"int32\")\n",
    "\n",
    "# 使用enqueue_many函数来初始化队列中的元素，\n",
    "# 和变量初始化类似，使用队列前需要明确调用这个初始化过程\n",
    "init = q.enqueue_many(([0, 10],))\n",
    "\n",
    "# 使用dequeue函数将队列中第一个元素出队列，这个元素的值将被保存在x中\n",
    "x = q.dequeue()\n",
    "# 出队元素加1\n",
    "y = x + 1\n",
    "# 将加1后的值重新加入队列\n",
    "q_inc = q.enqueue([y])\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    # 队列初始化操作\n",
    "    init.run()\n",
    "    for _ in range(5):\n",
    "        # 运行q_inc将执行数据出队列、出队的元素加1、重新加入队列这个过程\n",
    "        v, _ = sess.run([x, q_inc])\n",
    "        # 打印出队元素的值\n",
    "        # 队列开始有［0, 10］两个元素，第一个出队的为0 ，加1之后再次入队得到的队列为［10, 1］：\n",
    "        # 第二次出队的为10，加1之后入队的为11，得到的队列为［1, 11］；以此类推.\n",
    "        print(v)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**在TensorFlow中两种队列，除了上面的FIFOQueue这种队列，还有RandomShuffleQueue这种队列，**后者会将队列中的元素打乱，每次出队操作得到的是从当前队列所有元素中随机选择的一个，在训练神经网络时希望每次使用的数据尽量随机，RandomShuffleQueue就提供了这样的功能。\n",
    "\n",
    "**在TensorFlow中，队列不仅仅是一种数据结构，还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素，或者同时读取一个队列中的元素。**在后面的小节中将具体介绍TensorFlow是如何利用队列来实现多线程输入数据处理的。在本节之后的内容中将先介绍TensorFlow提供的辅助函数来更好地协同不同的线程。\n",
    "\n",
    "TensorFlow提供了tf.Coordinator和tf.QueueRunner两个类来完成**多线程协同**的功能:\n",
    "\n",
    "- **tf.Coordinator主要用于协同多个线程一起停止**，并提供了`should_stop`、`request_ stop`和`join`三个函数。在启动线程之前，需要先声明一个tf.Coordinator类，并将这个类传入每一个创建的线程中。**启动的线程需要一直查询tf.Coordinator类中提供的should_stop函数，当这个函数的返回值为True时，则当前线程也需要退出。每一个启动的线程都可以通过调用request_stop函数来通知其他线程退出。**当某一个线程调用request_stop函数之后，should_stop函数的返回值将被设置为True这样其他的线程就可以同时终止了。以下程序展示了如何使用tf.Coordinator:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Working on id: 0\n",
      "\n",
      "Working on id: 1\n",
      "\n",
      "Working on id: 2\n",
      "\n",
      "Working on id: 3\n",
      "\n",
      "Working on id: 4\n",
      "\n",
      "Stoping from id: 1\n",
      "Working on id: 2\n",
      "Working on id: 0\n",
      "\n",
      "\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import threading\n",
    "import time\n",
    "\n",
    "\n",
    "# 线程中运行的程序，这个程序每隔1秒判断是否需要停止并打印自己的ID\n",
    "def MyLoop(coord, worker_id):\n",
    "    # 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止\n",
    "    while not coord.should_stop():\n",
    "        # 随机停止所有线程\n",
    "        if np.random.rand() < 0.1:\n",
    "            print(\"Stoping from id: %d\\n\" % worker_id)\n",
    "            # 调用coord.request_stop函数来通知其他所有线程停止\n",
    "            coord.request_stop()\n",
    "        else:\n",
    "            print(\"Working on id: %d\\n\" % worker_id)\n",
    "        time.sleep(1)\n",
    "        \n",
    "# 声明一个tf.train.Coordinator类来协同多个线程\n",
    "coord = tf.train.Coordinator()\n",
    "# 声明创建5个线程\n",
    "threads = [threading.Thread(target=MyLoop, args=(coord, i,)) for i in range(5)]\n",
    "# 启动所有线程\n",
    "for t in threads:t.start()\n",
    "# 等待所有线程停止\n",
    "coord.join(threads)\n",
    "\n",
    "# 当所有线程启动之后，每个线程会打印各自的ID，于是前面4行打印出了它们的ID。\n",
    "# 然后在暂停1秒之后，所有线程又开始第二遍打印ID。在这个时候有一个线程退出的条件达到，\n",
    "# 于是调用了coord.request_stop函数来停止所有其他的线程。然而在打印'Stoping from id: 1'之后，\n",
    "# 可以看到有线程仍然在输出这是因为这些线程已经执行完coord.should_stop的判断，\n",
    "# 于是仍然会继续输出自己的ID，但在下一轮判断是否需要停止时将退出线程。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "- **tf.QueueRunner主要用于启动多个线程来操作同一个队列**，启动的这些线程可以通过上面介绍的tf.Coordinator类来统一管理。以下代码展示了如何使用tf.QueueRunner和tf.Coordinator来管理多线程队列操作："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "WARNING:tensorflow:From <ipython-input-14-f04267ff0251>:9: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "To construct input pipelines, use the `tf.data` module.\n",
      "WARNING:tensorflow:From <ipython-input-14-f04267ff0251>:12: add_queue_runner (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "To construct input pipelines, use the `tf.data` module.\n",
      "WARNING:tensorflow:From <ipython-input-14-f04267ff0251>:25: start_queue_runners (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.\n",
      "Instructions for updating:\n",
      "To construct input pipelines, use the `tf.data` module.\n",
      "-1.5000209\n",
      "-0.80331206\n",
      "0.9710177\n"
     ]
    }
   ],
   "source": [
    "# 先声明一个先进先出的队列，队列中最多100个元素，类型为实数型\n",
    "queue = tf.FIFOQueue(100, \"float\")\n",
    "# 定义队列的入队操作\n",
    "enqueue_op = queue.enqueue([tf.random_normal([1])])\n",
    "\n",
    "# 使用tf.train.QueueRunner来创建多个线程运行队列的入队操作\n",
    "#   第一个参数给出了被操作的队列；\n",
    "#   第二个参数表示需要启动5个线程，每个线程中运行的是enqueue_op操作\n",
    "qr = tf.train.QueueRunner(queue, [enqueue_op] * 5)\n",
    "\n",
    "# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合，默认tf.GraphKeys,QUEUE_RUNNERS\n",
    "tf.train.add_queue_runner(qr)\n",
    "# 定义出队操作\n",
    "out_tensor = queue.dequeue()\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    # 使用tf.train.Coordinator来协同启动的线程\n",
    "    coord = tf.train.Coordinator()\n",
    "    \n",
    "    # 使用tf.train.QueueRunner时，需要明确调用tf.train.start_queue_runners来启动所有线程，\n",
    "    # 否则因为没有线程运行入队操作，当调用出队操作时，程序会一直等待入队操作被运行。\n",
    "    # tf.train.start_queue_runners函数会默认启动tf.GraphKeys.QUEUE_RUNNERS集合中所有的QueueRunner，\n",
    "    # 因为这个函数只支持启动指定集合中的QueueRnner，所以一般来说\n",
    "    # tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个集合。\n",
    "    threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
    "    # 获取队列中的值\n",
    "    for _ in range(3):\n",
    "        print(sess.run(out_tensor)[0])\n",
    "    \n",
    "    # 使用tf.train.Coordinator来停止所有线程\n",
    "    coord.request_stop()\n",
    "    coord.join(threads)\n",
    "    \n",
    "# 以上程序将启动5个线程来执行队列入队的操作，其中每一个线程都是将随机数写入队列。\n",
    "# 于是在每次运行出队操作时，可以得到一个随机数"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 7.3.2 输入文件队列\n",
    "本节将介绍如何使用TensorFlow中的队列管理输入文件列表。在这一节中，假设所有的输入数据都已经整理成了TFRecord格式。**虽然一个TFRecord文件中可以存储多个训练样例，但是当训练数据较大时，可以将数据分成多个TFRecord文件来提高处理效率。**\n",
    "\n",
    "TensorFlow提供了`tf.train.match_filenames_once`来获取符合一个正则表达式的所有文件列表，得到的文件列表可以通过`tf.train.string_input_producer`函数进行有效的管理：\n",
    "- `tf.train.string_input_producer`函数会将初始化时提供的文件列表创建一个输入队列，输入队列中原始的元素为文件列表中的所有文件；\n",
    "- 通过设置shuffle参数，`tf.train.string_input_producer`函数支持支持随机打乱文件列表中的文件出队顺序，随机打乱文件顺序以及加入输入队列的过程会跑在一个单独的线程上，这样不会影响获取文件的速度；\n",
    "- `tf.train.string_input_producer`生成的输入队列可以同时被多个文件读取线程操作，而且输入队列会将队列中的文件均匀地分给不同的线程，不出现有些文件被处理过多次而有些文件还没有被处理过的情况；\n",
    "- 通过设置num_epoches参数，`tf.train.string_input_producer`可以限制加载初始文件的最大轮数，所有文件已经被使用了设定的轮数后如果继续尝试读取会报OutOfRange错误，测试时可设为1。\n",
    "\n",
    "在展示这两个函数的使用方法之前，下面先给出一个简单的程序来生成样例数据："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "# 创建TFRecord文件的辅助函数\n",
    "def _int64_feature(value):\n",
    "    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))\n",
    "\n",
    "# 模拟海量数据情况下将数据写入不同的TFRecord文件\n",
    "num_shards = 2               # 定义总共写入多少个文件\n",
    "instances_per_shard = 2      # 定义每个文件中有多少个数据\n",
    "for i in range(num_shards):\n",
    "    # 将数据分为多个文件时，可以将不同文件以类似OOOOn-of-OOOOm的后缀区分。\n",
    "    # 其中m表示了数据总共被存在了多少个文件中，n表示当前文件的编号。\n",
    "    # 这样既方便了通过正则方式获取文件列表，又在文件名中加入了更多的信息。\n",
    "    filename = ('data.tfrecords-%.5d-of-%.5d' % (i, num_shards)) \n",
    "    \n",
    "    # 将Example结构写入TFRecord文件\n",
    "    writer = tf.python_io.TFRecordWriter(filename)\n",
    "    for j in range(instances_per_shard):\n",
    "    # Example结构仅包含当前样例属于第几个文件以及是当前文件的第几个样本。\n",
    "        example = tf.train.Example(features=tf.train.Features(feature={\n",
    "            'i': _int64_feature(i),\n",
    "            'j': _int64_feature(j)}))\n",
    "        writer.write(example.SerializeToString())\n",
    "    writer.close()  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "程序运行之后，在指定的目录下将生成两个文件：指定路径下的data.tfrecords-00000-of-00002和data.tfrecords-00001-of-00002。每一个文件中存储了两个样例。在生成了样例数据之后，以下代码展示了`tf.train.match_filenames_once`函数和`tf.train.string_input_producer`函数的使用方法:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[b'.\\\\data.tfrecords-00000-of-00002' b'.\\\\data.tfrecords-00001-of-00002']\n",
      "[0, 0]\n",
      "[0, 1]\n",
      "[1, 0]\n",
      "[1, 1]\n",
      "[0, 0]\n",
      "[0, 1]\n"
     ]
    }
   ],
   "source": [
    "# 使用tf.train.match_filenames_once函数获取文件列表\n",
    "files = tf.train.match_filenames_once(\"data.tfrecords-*\")\n",
    "\n",
    "# 使用tf.train.string_input_producer函数创建输入队列，这里shuffle，真实问题一般True\n",
    "filename_queue = tf.train.string_input_producer(files, shuffle=False)\n",
    "\n",
    "# 如7.1节中读取并解析一个样本\n",
    "reader = tf.TFRecordReader()\n",
    "_, serialized_example = reader.read(filename_queue)\n",
    "features = tf.parse_single_example(\n",
    "      serialized_example,\n",
    "      features={\n",
    "          'i': tf.FixedLenFeature([], tf.int64),\n",
    "          'j': tf.FixedLenFeature([], tf.int64),\n",
    "      })\n",
    "\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    # 虽然本段程序中没有声明任何变量，但是使用tf.train.初始化一些变量\n",
    "    sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])\n",
    "    \n",
    "    # 查看文件名列表\n",
    "    print(sess.run(files))\n",
    "    \n",
    "    # 声明tf.train.Coordinator类来协同不同的线程，并启动线程\n",
    "    coord = tf.train.Coordinator()\n",
    "    threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
    "    \n",
    "    # 多次执行获取数据的操作\n",
    "    for i in range(6):\n",
    "        print(sess.run([features['i'], features['j']]))\n",
    "    coord.request_stop()\n",
    "    coord.join(threads)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在文件不打乱列表的情况下，会依次读出数据中的每个样例。而且当所有样例都被读完之后，程序会自动从头开始。如果限制num_epochs为1，则程序会报错。\n",
    "\n",
    "### 7.3.3 组合训练数据（batching）\n",
    "上节介绍了如何从文件列表中读取单个样例，将这些单个样例通过7.2节中介绍的预处理方法进行处理，就可以得到提供给神经网络输入层的训练数据了。在第4章介绍过，将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的预处理结果之后，还需要将它们组织成batch，然后再提供给神经网络的输入层。\n",
    "\n",
    "TensorFlow提供了`tf.train.batch`和`tf.train.shuffle_batch`函数来将单个的样例组织成batch的形式输出。**这两个函数都会生成一个队列，队列的入队操作是生成单个样例的方法，而每次出队得到的是一个batch的样例，它们唯一的区别在于是否会将数据顺序打乱**。以下代码展示了`tf.train.batch`的使用方法:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[0 0 1] [0 1 0]\n",
      "[1 0 0] [1 0 1]\n",
      "[1 1 0] [0 1 0]\n"
     ]
    }
   ],
   "source": [
    "# 使用7.3.2中方法读取并解析得到样例，\n",
    "# 这里假设Example中i表示样例特征向量，如图像像素矩阵，y表示对应标签\n",
    "example, label = features['i'], features['j']\n",
    "\n",
    "# 一个batch中样例个数\n",
    "batch_size = 3\n",
    "# 组合样例的队列中最多可以存储的样例个数。这个队列如果太大，那么需要占用很多内在资源：\n",
    "# 如果太小，那么出队操作可能会因为没有数据而被阻碍（block），从而导致训练效率降低。\n",
    "# 一般来说这个队列的大小会和每一个batch的大小相关，下面给出了设置队列大小的一种方式。\n",
    "capacity = 1000 + 3 * batch_size\n",
    "\n",
    "# 使用tf.train.batch 函数来组合样例：\n",
    "#    ［example,label］参数给出了需要组合的元素，\n",
    "#         一般example和label分别代表训练样本和这个样本对应的正确标签。\n",
    "#     batch_size参数给出了每个batch 中样例的个数。\n",
    "#     capacity给出了队列的最大容量。\n",
    "#         当队列长度等于容量时，TensorFlow将暂停入队操作，而只是等待元索出队；\n",
    "#         当元素个数小于容量时，TensorFlow将自动重新启动入队操作。\n",
    "example_batch, label_batch = tf.train.batch([example, label], \n",
    "                                            batch_size=batch_size, \n",
    "                                            capacity=capacity)\n",
    "\n",
    "with tf.Session() as sess:\n",
    "    tf.global_variables_initializer().run()\n",
    "    tf.local_variables_initializer().run()\n",
    "    coord = tf.train.Coordinator()\n",
    "    threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
    "    \n",
    "    # 获取并打印组合之后的样例。真实问题中，这个输出一般会作为神经网络的输入\n",
    "    for i in range(3):\n",
    "        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])\n",
    "        print(cur_example_batch, cur_label_batch)\n",
    "        \n",
    "    coord.request_stop()\n",
    "    coord.join(threads)\n",
    "    \n",
    "# 从这个输出可以看到tf.train.batch 函数可以将单个的数据组织成3个一组的batch,\n",
    "# Eexample, label 中读到的数据依次为：\n",
    "# example: 0, lable: 0\n",
    "# example: 0, lable: 1\n",
    "# example: 1, lable: 0\n",
    "# example: 1, lable: 0\n",
    "# 这是因为tf.train.batch函数不会随机打乱顺序，所以组合之后得到的数据组合成了上面给出的输出。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下面一段代码展示了`tf,train.shuffle_batch`函数的使用方式："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[0 1 0] [0 1 1]\n",
      "[1 1 0] [1 0 0]\n",
      "[1 1 0] [0 1 1]\n"
     ]
    }
   ],
   "source": [
    "# 和tf.train.batch的样例代码一样产生example和label。\n",
    "example, label = features ['i'], features ['j']\n",
    "\n",
    "batch_size = 3\n",
    "capacity = 1000 + 3 * batch_size\n",
    "\n",
    "# 使用tf.train.shuffle_batch函数来组合样例。\n",
    "# tf.train.shuffle_batch函数的参数大部分和tf.train.batch函数相似，\n",
    "# 但是min_after_dequeue参数是tf.train.shuffle_batch函数特有的。\n",
    "# min_after_dequeue 参数限制了出队时队列中元素的最少个数，\n",
    "# 因为当队列中元素太少时，随机打乱样例顺序的作用就不太了。\n",
    "# 当出队函数被调用但是队列中元素个数不够时，出队操作将等待里多的元素入队才会完成。\n",
    "# 如果min_after_dequeue参数被设定，capacity也应该相应调整来满足性能需求。\n",
    "example_batch, label_batch = tf.train.shuffle_batch(\n",
    "    [example, label], batch_size=batch_size,\n",
    "    capacity=capacity, min_after_dequeue=30)\n",
    "\n",
    "# 和tf.train.batch的样例代码一样打印example_batch, label_batch。\n",
    "with tf.Session() as sess:\n",
    "    tf.global_variables_initializer().run()\n",
    "    tf.local_variables_initializer().run()\n",
    "    coord = tf.train.Coordinator()\n",
    "    threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
    "    \n",
    "    # 获取并打印组合之后的样例。真实问题中，这个输出一般会作为神经网络的输入\n",
    "    for i in range(3):\n",
    "        cur_example_batch, cur_label_batch = sess.run([example_batch, label_batch])\n",
    "        print(cur_example_batch, cur_label_batch)\n",
    "        \n",
    "    coord.request_stop()\n",
    "    coord.join(threads)\n",
    "    \n",
    "# 从输出可以看到，得到的样例顺序已经被打乱了。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**`tf.train.batch`函数和`tf.train.shuffle_batch`函数除了可以将单个训练数据整理成输入batch，也提供了并行化处理输入数据的方法。**二者并行化的方式一致，所以在本节中仅以应用得更多的后者为例：\n",
    "\n",
    "- 设置`tf.train.shuffle_batch`函数中的num_threads参数，可以指定多个线程同时执行入队操作。`tf.train.shuffle_batch`函数的入队操作就是数据读取以及预处理的过程。当num_threads参数大于1时，多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时，可以使用`tf.train.shuffle_batch_join`函数，此函数会从输入文件队列中获取不同的文件分配给不同的线程。一般来说，输入文件队列是通过7.3.2中介绍的`tf.train.string_input_producer`函数生成的，这个函数会平均分配文件以保证不同文件中的数据会被尽量平均地使用。\n",
    "\n",
    "`tf.train.shuffle_batch`和`tf.train.shuffle_batch_join`函数都可以完成多线程并行的方式来进行数据的处理，它们各有优劣：\n",
    "- 对于`tf.train.shuffle_batch` 函数，不同线程会读取同一个文件。如果一个文件中的样例比较相似（比如都属于同一个类别），那么神经网络的训练效果有可能会受到影响。所以在使用`tf.train.shuffle_batch`函数时，需要尽量将同一个TFRecord 文件中的样例随机打乱。\n",
    "- 而使用`tf.train.shuffle_batch_join`函数时，不同线程会读取不同文件。如果读取数据的线程数比总文件数还大，那么多个线程可能会读取同一个文件中相近部分的数据。而且多个线程读取多个文件可能导致过多的硬盘寻址，从而使得读取效率降低。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 7.3.4 输入数据处理框架\n",
    "综合了上面讲的整个流程，下面给出了一个完整的TensorFlow来处理输入数据的样例："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "After 0 training step(s), loss is 307.707 \n",
      "After 1000 training step(s), loss is 2.65173 \n",
      "After 2000 training step(s), loss is 2.3936 \n",
      "After 3000 training step(s), loss is 2.57201 \n",
      "After 4000 training step(s), loss is 2.24478 \n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "# 1. 创建立件列表，并通过文件列表创建输入文件队列。在调用输入数据处理流程前，需要统一\n",
    "# 所有原始数据的格式并将它们存储到TFRecord文件中。下面给出的文件列表中应该包含所有\n",
    "# 提供训练数据的TFRecord 文件。\n",
    "files = tf.train.match_filenames_once(\"output.tfrecords\")\n",
    "filename_queue = tf.train.string_input_producer(files, shuffle=False) \n",
    "\n",
    "\n",
    "# 2. 解析TFRecord文件里的数据。这里假设image_raw中存储的是图像\n",
    "# 的原始数据，pixels代表图片的像素数，label为该样例所对应的标签。\n",
    "reader = tf.TFRecordReader()\n",
    "_,serialized_example = reader.read(filename_queue)\n",
    "features = tf.parse_single_example(\n",
    "    serialized_example,\n",
    "    features={\n",
    "        'image_raw':tf.FixedLenFeature([],tf.string),\n",
    "        'pixels':tf.FixedLenFeature([],tf.int64),\n",
    "        'label':tf.FixedLenFeature([],tf.int64)\n",
    "    })\n",
    "\n",
    "# 将原始图像数据解析出像素矩阵\n",
    "decoded_images = tf.decode_raw(features['image_raw'], tf.uint8)\n",
    "retyped_images = tf.cast(decoded_images, tf.float32)\n",
    "images = tf.reshape(retyped_images, [784])\n",
    "labels = tf.cast(features['label'], tf.int32)\n",
    "#pixels = tf.cast(features['pixels'], tf.int32)\n",
    "\n",
    "# 3. 将文件以100个为一组打包\n",
    "min_after_dequeue = 10000\n",
    "batch_size = 100\n",
    "capacity = min_after_dequeue + 3 * batch_size\n",
    "\n",
    "image_batch, label_batch = tf.train.shuffle_batch([images, labels], \n",
    "                                                    batch_size=batch_size, \n",
    "                                                    capacity=capacity, \n",
    "                                                    min_after_dequeue=min_after_dequeue)\n",
    "\n",
    "\n",
    "# 4. 建立NN模型，训练\n",
    "# 定义模型结构\n",
    "def inference(input_tensor, weights1, biases1, weights2, biases2):\n",
    "    layer1 = tf.nn.relu(tf.matmul(input_tensor, weights1) + biases1)\n",
    "    return tf.matmul(layer1, weights2) + biases2\n",
    "\n",
    "# 模型相关的参数\n",
    "INPUT_NODE = 784\n",
    "OUTPUT_NODE = 10\n",
    "LAYER1_NODE = 500\n",
    "REGULARAZTION_RATE = 0.0001   \n",
    "TRAINING_STEPS = 5000        \n",
    "\n",
    "weights1 = tf.Variable(tf.truncated_normal([INPUT_NODE, LAYER1_NODE], stddev=0.1))\n",
    "biases1 = tf.Variable(tf.constant(0.1, shape=[LAYER1_NODE]))\n",
    "\n",
    "weights2 = tf.Variable(tf.truncated_normal([LAYER1_NODE, OUTPUT_NODE], stddev=0.1))\n",
    "biases2 = tf.Variable(tf.constant(0.1, shape=[OUTPUT_NODE]))\n",
    "\n",
    "y = inference(image_batch, weights1, biases1, weights2, biases2)\n",
    "    \n",
    "# 计算交叉熵及其平均值\n",
    "cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=label_batch)\n",
    "cross_entropy_mean = tf.reduce_mean(cross_entropy)\n",
    "    \n",
    "# 损失函数的计算\n",
    "regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)\n",
    "regularaztion = regularizer(weights1) + regularizer(weights2)\n",
    "loss = cross_entropy_mean + regularaztion\n",
    "\n",
    "# 优化损失函数\n",
    "train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)\n",
    "    \n",
    "# 初始化会话，并开始训练过程。\n",
    "with tf.Session() as sess:\n",
    "    # tf.global_variables_initializer().run()\n",
    "    sess.run((tf.global_variables_initializer(),\n",
    "              tf.local_variables_initializer()))\n",
    "    coord = tf.train.Coordinator()\n",
    "    threads = tf.train.start_queue_runners(sess=sess, coord=coord)\n",
    "    # 循环的训练神经网络。\n",
    "    for i in range(TRAINING_STEPS):\n",
    "        if i % 1000 == 0:\n",
    "            print(\"After %d training step(s), loss is %g \" % (i, sess.run(loss)))\n",
    "                  \n",
    "        sess.run(train_step)\n",
    "        \n",
    "    coord.request_stop()\n",
    "    coord.join(threads)  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "下图展示了以上代码中输入数据处理的整个流程。\n",
    "\n",
    "从图中可以看出，输入数据处理的第一步为通过`tf.train.match_filenames_once`获取存储训练数据的文件列表，在下图中，这个文件列表为{A,B,C}。\n",
    "\n",
    "然后通过`tf.train.string_input_producer`函数，可以选择性地将文件列表中文件的顺序打乱，并加入输入队列（因为是否打乱文件的顺序是可选的，所以在下中通过虚线表示）。`tf.train.string_input_producer`函数会生成并维护一个输入文件队列，不同线程中的文件读取函数可以共享这个输入文件队列。\n",
    "\n",
    "在读取样例数据之后，需要将图像进行预处理。图像预处理的过程也会通过`tf.train.shuffle_batch`提供的机制并行地跑在多个线程中。输入数据处理流程的最后通过`tf.train.shuffle_batch`函数将处理好的单个输入样例整理成batch提供给神经网络的输入层。\n",
    "\n",
    "**通过这种方式，可以有效地提高数据预处理的效率，避免数据预处理成为神经网络模型训练过程中的性能瓶颈。**\n",
    "<p align='center'>\n",
    "    <img src=images/图7.15.JPG>\n",
    "</p>"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
